关于Flink 本地测试,自定义WebUI 端口的方法 您所在的位置:网站首页 flink 本地执行 关于Flink 本地测试,自定义WebUI 端口的方法

关于Flink 本地测试,自定义WebUI 端口的方法

2023-08-06 02:17| 来源: 网络整理| 查看: 265

以1.11.1版本举例,相差不大的版本之间大同小异。 先给成品:以Scala代码举例,Java大同小异。 通过反射将配置加入env的配置对象中。之后使用修改过的env来创建flink的任务流即可。

val env = StreamExecutionEnvironment.getExecutionEnvironment val javaEnv: environment.StreamExecutionEnvironment = env.getJavaEnv val field = classOf[org.apache.flink.streaming.api.environment.StreamExecutionEnvironment].getDeclaredField("configuration") field.setAccessible(true) import org.apache.flink.configuration.Configuration val configuration: Configuration = field.get(javaEnv).asInstanceOf[Configuration] configuration.setString("rest.bind-port", "8081")

下面是探索过程,没兴趣的可以过了。

当我们加入了pom依赖后.发现能够看到本地IDE中的flink的webUI了.

org.apache.flink flink-runtime-web_2.11 ${flink.version} compile

根据日志中显示可知我们的本地web端口为16434. 这不是一个我们想要看到的. 而且每一次运行都会产生一个随机的端口.这实在很痛苦.

17:15:28,577 INFO org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint - Rest endpoint listening at localhost:16434 17:15:28,578 INFO org.apache.flink.runtime.highavailability.nonha.embedded.EmbeddedLeaderService - Proposing leadership to contender http://localhost:16434 17:15:28,581 INFO org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint - Web frontend listening at http://localhost:16434 17:15:28,581 INFO org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint - http://localhost:16434 was granted leadership with leaderSessionID=eb84fead-f735-4350-aff4-a7f883013432

所以我们要想办法来固定端口.最好可以自定义. 来看源码 根据日志定位到org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint这个类,但是进去之后你会发现里面并没有期望中的日志内容。很坑,这是一个异步调用的中转站。 那么我们就需要知道这样一条日志出现在哪里了

Rest endpoint listening at

首先我们目前所在的地方是org.apache.flink:flink-runtime-web。根据(1)关联模块的命名相近原则以及 (2)DispatcherRestEndpoint中大量的 flink-runtime模块类调用。我们可以认为我们想要找的东西在org.apache.flink:flink-runtime中。

那么前往flink-runtime_2.11-1.11.1.jar源码下进行全目录检索。发现了这条日志的藏身之所:org.apache.flink.runtime.rest.RestServerEndpoint

然后就是一步一步往回探了。线索图如下,我们发现源自restBindPortRange全局变量。

在这里插入图片描述 是这里 this.restBindPortRange = configuration.getRestBindPortRange(); 是从configuration对象中拿键为org.apache.flink.configuration.RestOptions.BIND_PORT的值的值。

而这个变量来自于本类的构造函数通过参数传入的。这个参数的类型是RestServerEndpointConfiguration

然后就是一步一步的对这个config对象寻根溯源。 得利于IDE的美妙。我们终于找到了他的发源地,因为接口继承的关系,有两种来源。一个是ClusterEntrypoint一个是MiniCluster,由于我们是本地调试,所以只要看后者即可。即org.apache.flink.runtime.minicluster.MiniCluster中的 final Configuration configuration = miniClusterConfiguration.getConfiguration();

所以我们只要把配置想办法加进去即可。 是的,我们找到了,这个配置对象最初始最初始的状态就是在我们的老朋友StreamExecutionEnvironment中装载的。甚至new都是在这里。但是这个对象很遗憾。

private final Configuration configuration;

私有,且没有public方法能get到。没办法了,我们只能祭出大杀器,反射。

按照开头给出的代码,实现configuration动态修改,再使用修改之后的env来创建Flink的任务流。 然后我们就能发现我们能在本地固定的8081端口打开Flink的WebUI了。



【本文地址】

公司简介

联系我们

今日新闻

    推荐新闻

    专题文章
      CopyRight 2018-2019 实验室设备网 版权所有